We will cover a very basic, naive and simplistic take on what asynchronous programming is like. However, I do believe that the example we explore will give the reader a good enough picture of the building blocks of a powerful and complex technique.
Enjoy!
A service for fetching news
Imagine we work in a startup! The startup wants to build this really cool new service where users input a topic into a search field and they get a bunch of news collected from the best online news sites there are. We are the back-end engineering team and we are tasked with building the core of this fantastic new product – the news aggregator. Luckily for us, all of the on-line news agencies which we will be querying provide nice APIs. All we need to do is for each requested topic to make a call to each of the APIs, collect and format the data so it’s readable by our front-end and send it to the client. The front-end team takes care of displaying it to the user. As with any startup, hitting the market super fast is of crucial importance, so we create the simplest possible script and release our new product. Below is the script of our engine.
format_europe($europe_news),
'asia_news' => format_asia($asia_news),
'africa_news' => format_africa($africa_news)
];
echo json_encode($formatted);
This is as simple as it gets! We give a big “Thank you” to the creators of PHP for making the wonderful file_get_contents() function which drives our API communications and we launch our first version.
YOU LOVE PHP?
Explore the PHP Core Track
Our product proves to be useful and the number of clients using it starts to increase from day to day. As our business expands and so does the demand for news from The Americas and from some other countries. Our engine is easy to expand, so we add news from the respective news services in a matter of minutes. However, with each additional news service, our aggregator gets slower and slower.
A couple of months later our first competitor appears on the market. They provide the exact same product, only it’s blazingly fast. We now have to quickly come up with a way to drastically improve our response time. We try upgrading our servers, scaling horizontally with more machines, paying for a faster Internet connection, but still we don’t get even close to the incredible performance of our competitor. We are in trouble and we need to figure out what to do!
The Synchronous nature of PHP
Most of you have probably already noticed what is going on in our “engine” and why adding more news sites makes things slower and slower. Whenever we make a call to a news service in our script, we wait for the call to complete before we make the next call. The more services we add, the more we have to wait. This is because the built-in tools that PHP provides us with are in their nature designed for a synchronous programming flow. This means that operations are done in a strict order and each operation we start must first end before the next one starts. This makes the programming experience nice, as it is really easy to follow and to reason about the flow. Also, most of the time a synchronous flow fits perfectly with our goals. However, in this particular example, the synchronous flow of our program is what in fact slows it down. Downloading data from external services is a slow operation and we have a bunch of downloads. However, nothing in our program requires the downloads to be done sequentially. If we could do the downloads concurrently, this would drastically improve the overall speed of our service.
A little bit about I/O operations
Before we continue, let’s talk a little about what happens when we work with any input/output operations. Whether we are working with a local file or talking to a device in our computer or communicating over a network, pretty much the flow is the same. It goes something like this.
When sending/writing data…
- There is some sort of memory which acts as an output buffer. It may be allocated in the RAM or it may be memory on the device we are talking to. In any case, this output buffer is limited in size.
- We write some of the data we want to send to the output buffer.
- We wait for the data in the output buffer to get sent/written to the device with which we are communicating.
- Once this is done, we check if there is more data to send/write. If there is, we go to 2. If not, we go back to whatever we were doing immediately before we requested the output operation (we return).
When we receive data a similar process occurs.
- There is an input buffer. It also is limited in size.
- We make a request to read some data.
- We wait while the data is being read and placed into the input buffer.
- Once a chunk of data is available, we append its contents in our own memory (in a variable probably).
- If we expect more data to be received, we go to 3. Otherwise we return the read data to the procedure which requested and carry on from where we left off with it.
Notice that in each of the flows there is a point in which we wait. The waiting point is also in a loop, so we wait multiple times, accumulating waiting time. And because output and input operations are super-slow compared to the working speed of our CPU, waiting is what the CPU ends up spending most of its time doing. Needless to say, it doesn’t matter how fast our CPU or PHP engine is when all they’re doing is waiting for other slow things to finish.
Lucky for us, there is something we can do.
IPC NEWSLETTER
All news about PHP and web development
The above processes describe what we call blocking I/O operations. We call them blocking, because when we send or receive data the flow of the rest of the program blocks until the operation is finished. However, we are not in fact required to wait for the finish. When we write to the buffer we can just write some data and instead of waiting for it to be sent, we can just do something else and come back to write some more data later. Similarly, when we read from an input buffer, we can just get whatever data there is in it and continue doing something else. At a later point we can revisit the input buffer and get some more data if there is any available. I/O operations which allow us to do that are called non-blocking. If we start using non-blocking instead of blocking operations we can achieve the concurrency we are after.
Concurrently downloading files
At this point it is a good idea that our team looks into the existing tools for concurrent asynchronous programming with PHP like ReactPHP and AMPHP. However, our team is imaginary and is in the lead role of a Proof-of-Concept article, so they are going to take the crooked path and try to reinvent the wheel.
Now that we know what are blocking and non-blocking I/O operations, we can actually start making progress. Currently when we are fetching data from news services we have a flow like the following:
- Get all the data from service 1
- Get all the data from service 2
- Get all the data from service 3
- .
- .
- .
- Get all the data from service n
- Instead, the flow we want to have would look something like the following:
- Get a little bit of data from service 1
- Get a little bit of data from service 2
- Get a little bit of data from service 3
- …
- Get a little bit of data from service n
- Get a little bit of data from service 1
- Get a little bit of data from service 3
- …
- Get a little bit of data from service 2
- …
- We have collected all the data
- In order to achieve this, we first need to get rid of file_get_contents().
Reimplementing file_get_contents()
The () function is a blocking one. As such we need to replace it with a non-blocking version. We will start by re-implementing its current behavior and then we will gradually refactor towards our goal.
Below is our drop-in replacement for file_get_contents().
function fetchUrl(string $url) {
$host = parse_url($url)['host'];
$fp = @stream_socket_client("tcp://$host:80", $errno, $errstr, 30);
if (!$fp) {
throw new Exception($errstr);
}
stream_set_blocking($fp, false);
fwrite($fp, "GET / HTTP/1.1\r\nHost: $url\r\nAccept: */*\r\n\r\n");
$content = '';
while (!feof($fp)) {
$bytes = fgets($fp, 2048);
$content .= $bytes;
}
return $content;
}
Let’s break down what is happening:
- We open a TCP socket to the server we want to contact.
- We throw an exception if there is an error
- We set the socket stream to non-blocking.
- We write an HTTP request to the socket.
- We define a variable $content in which to store the response.
- We read data from the socket and append it to the response received so far.
- We repeat step 6 until we reach the end of the stream.
Note the stream_set_blocking() call we make. This sets the stream to non-blocking mode. We feel the effect of this when we later call fgets(). The second parameter we pass to fgets() is the number of bytes we want to read from the input buffer (in our case – 2048). If the stream mode is blocking, then fgets() will block until it can give us 2048 bytes or until the stream is over. In a non-blocking mode, fgets() will return whatever is in the buffer (but no more than 2048 bytes) and will not wait if this is less than 2048 bytes.
Although we are now using non-blocking input this function still behaves as the original file_get_contents(). Because of the loop in it, once we call it, we will be stuck until it’s complete. We need to get rid of this loop, or rather – move it out of the function.
We can break down what the function does in four steps:
- Initialization – opening the socket and writing the request
- Checking if we’ve reached the end of the stream
- Reading some data if not
- Returning the data if yes
Disregarding the loop, we can organize those parts in a class. The first three steps we will implement as methods, and instead of returning the data, we will simply expose the buffer as public.
class URLFetcher
{
public string $content = '';
private $fp;
public function __construct(private string $url) {}
public function start(): void {
$host = parse_url($this->url)['host'];
$this->fp = @stream_socket_client(...);
if (!$this->fp) {
throw new Exception($errstr);
}
stream_set_blocking($this->fp, false);
fwrite($this->fp, "GET …");
}
public function readSomeBytes(): void {
$this->content .= fgets($this->fp, 2048);
}
public function isDone(): bool {
return feof($this->fp);
}
}
Rebuilding the loop
Now we need to rebuild the loop. This time, instead of executing one loop per file, we want to have multiple files in one loop.
Because we now have many news services to fetch data from, we have refactored our initial code to hold their names and URLs in an array.
$services = [
'europe' => 'https://api.europe-news.org?q=%s',
'asia' => 'https://api.asia-news.org?s=%s'
...
];
For each service we will create a URLFetcher and ‘start’ it. We will also keep a reference to each of the fetchers.
$fetchers = [];
foreach ($services as $name => $url) {
$fetcher = new URLFetcher(sprintf($url, $topic));
$fetcher->start();
$fetchers[$name] = $fetcher;
}
Now we will add the loop in which we will iterate through the fetchers, reading some bytes from each of them upon each iteration.
$finishedFetchers = [];
while (count($finishedFetchers) < count($fetchers)) {
foreach ($fetchers as $name => $fetcher) {
if (!$fetcher->isDone()) {
$fetcher->readSomeBytes();
} else if (!in_array($name, $finishedFetchers)) {
$finishedFetchers[] = $name;
}
}
}
The $finishedFetchers array helps us track which fetchers have finished their work. Once all of the fetchers are done, we exit the loop. The data gathered is accessible through the $content property of each fetcher. This simple way of downloading data concurrently gives us an incredible performance boost.
Having successfully solved our performance issues, we beat the competition and our business continues to grow. With it – the requirements towards our engine.
One of the new features we need to implement in the next trimester is a history of all the topics our users had searched for and the results they got for them. For this we want to use a SQL database, but when attempting to add it to the mix, the numerous inserts we perform for each topic slow down our service significantly. We already know what the problem is – the execution of the database queries is blocking and thus each insert delays the execution of everything else. We immediately take action and develop our own implementation of functionality for concurrent DB inserts. However, adding those to the loop we have proves to be quite a mess. The inserts need looping over and tracking of their own, but they also need to track the requests to the services, because we can not do an insert before having the data from the respective new service. Once again, we have to rethink our lives.
Generalizing the Loop
It is clear to see that if we want to take advantage of other non-blocking operations we would need to have some sort of a handy generic way to add more things in the ‘driving’ loop. We need a loop which makes it possible to dynamically add more things in it to get executed. It turns out creating such a loop is quite simple.
class Loop
{
private static array $callbacks = [];
public static function add(callable $callback)
{
self::$callbacks[] = $callback;
}
public static function run()
{
while (count(self::$callbacks)) {
$cb = array_shift(self::$callbacks);
$cb();
}
}
}
The $callbacks array acts as a FIFO queue. At any point in our program we can add functions to it to get executed. Once we call the run() method, functions on the queue will start being executed. The run() method will run until there are no callbacks in the queue. This can potentially be forever as each of the callbacks may add new callbacks while being executed.
Next step would be to adapt our downloading tools. We can create a small function to work with our file fetcher class and with the loop.
function fetchUrl(string $url) {
$fetcher = new URLFetcher($url);
Loop::add(fn () => $fetcher->start());
$tick = function () use ($fetcher, &$tick) {
if (!$fetcher->isDone()) {
$fetcher->readSomeBytes();
Loop::add($tick);
}
};
Loop::add($tick);
}
In this new version of fetchUrl() we instantiate a fetcher and add a callback to the loop which will start the download. Then we create a closure which we also add to the loop. When called the closure will check if the fetcher is done and if it’s not it will read some bytes and add itself to the loop again. This will ‘drive’ reading from the stream until the end is reached.
All we have to do now is add all our services to the loop and start it:
foreach ($services as $url) {
fetchUrl($url);
}
Loop::run();
This will indeed download the data from all of the services we need, but we have a major problem – we don’t have any means to get the results. We can not get them from fetchUrl(), because it returns before the download has even started. We also want to record the fetched results to the database (remember the new feature we’re implementing) and we want to do this during the download. Otherwise we would have to wait for a new loop for recording things and this would slow us down.
IPC NEWSLETTER
All news about PHP and web development
The solution to our problems is to add one more parameter to fetchUrl() – a callback function which will get called when downloading the data is complete. As a parameter this callback will take the downloaded data and in its body it will initiate the insertion in the database.
Bellow is the new fetchUrl() with the changes in red:
function fetchUrl(string $url, callable $done) {
$fetcher = new URLFetcher($url);
Loop::add(fn () => $fetcher->start());
$tick = function () use ($fetcher, $done, &$tick) {
if (!$fetcher->isDone()) {
$fetcher->readSomeBytes();
Loop::add($tick);
} else {
$done($fetcher->content);
}
};
Loop::add($tick);
}
And now the updated initialization:
$results = [];
foreach ($services as $name => $url) {
fetchUrl(
$url,
function (string $content) use ($name, &$results) {
$results[$name] = $content;
insertIntoDatabase($content);
}
);
}
Loop::run();
The callback now collects the results from the news service and initiates the database insert. The database insert will use similar techniques and will take advantage of the Loop to run concurrently with the other tasks and thus we eliminate the need to wait for another loop.
Error Handling
There are many things that can go wrong while downloading data from the Internet, but in our example we are only throwing one exception from within the start() method of the URLFetcher. For the sake of simplicity we are going to keep it this way. You may have noticed that so far we haven’t been dealing with this exception at all. Time to address this oversight.
A naive colleague from our imaginary team tried to handle the issue by enclosing the calls to fethcUrl() in a try-catch block like this.
foreach ($services as $name => $url) {
try {
fetchUrl(...);
} catch (Exception $e) {
...
}
}
Loop::run();
The production environment quickly and painfully demonstrated to our team that the exception somehow slipped out of the try-catch block and went unhandled into our script to break it.
Well, the thing is fetchUrl() does not actually throw any exceptions. It merely adds callbacks to the loop. One of those callbacks throws an exception (the one initializing the fetcher), but it does not get called until later on. It is only when we start the loop (call Loop::run()) when the exceptions start being thrown. Enclosing the Loop::run() call into a try-catch block will allow us to catch exceptions thrown from within it, but at this level of handling we won’t know what threw them. And even if we did know that, how would we return to the flow of the respective function after handling the error?
The way we can deal with this situation is by adding one more callback parameter to the fetchUrl() function. The new callback will get called whenever an error occurs. So fetchUrl() will look something like this:
function fetchUrl(string $url, callable $done, callable $onerror) {
$fetcher = new URLFetcher($url);
Loop::add(function () {
try {
$fetcher->start()
} catch (Exception $e) {
$onerror($e);
}
});
$tick = function () use ($fetcher, $done, $onerror, &$tick) {
if (!$fetcher->isDone()) {
try {
$fetcher->readSomeBytes();
} catch (Exception $e) {
$onerror($e);
}
Loop::add($tick);
} else {
$done($fetcher->content);
}
};
Loop::add($tick);
}
And respectively the calling code would now look like this:
foreach ($services as $name => $url) {
fetchUrl(
$url,
function (string $content) {...},
function (Exception $e) {...}
);
}
Loop::run();
Now we can handle error situations properly via the new callback.
Retrospect
By the end of the story, in order to allow concurrent operations, our imaginary team had started practicing asynchronous programming in a single-threaded environment based on non-blocking I/O and an event loop. The last sentence is bloated with terminology and I would like to talk briefly about terms.
Concurrency
Both in computer and in general contexts, this means to be dealing with more than one thing at a time. In our example we were downloading data from multiple Internet services and inserting entries in a database at the same time.
Asynchrony
“Asynchrony, in computer programming, refers to the occurrence of events independent of the main program flow and ways to deal with such events.”
Wikipedia
In our example the main program flow was dealing with downloading data, inserting records, encoding for the clients and sending to them. The “events” outside of the main flow were in fact the events of new data being available for reading, the successful completion of sending data, etc.
Non-blocking I/O
We based our work on the ability to “query” I/O for its availability. The way we did it was to periodically check if we could use the “device”. This is called polling. Since polling requires CPU cycles, our program becomes more CPU demanding than it needs to be. It would have been smarter if we had “outsourced” the polling to some sort of a lower level “actor” like our operating system or a specialized library. We could then communicate with it via events, interrupts or another mechanism. In any case though, whatever this mechanism for communicating with I/O devices was, at the end of the day it would still be built upon non-blocking I/O polling and maybe hardware interrupts.
Event loop
Notice we didn’t call our loop an event loop but just a “loop”. This was intentional, because it would have brought confusion as we hadn’t mentioned events anywhere else in the example. An event loop is just a specific version of a loop like ours. It is designed to work in conjunction with event-based I/O communication and thus the name “event loop”. It allows callbacks to be executed when a certain event occurs, making it more “user-friendly” for the programmer, but essentially it is the same thing. Other names for an event loop are message pump, message dispatcher, run loop and more.
… and last, but not least…
Single-threaded
PHP has a single-threaded model of execution and this will most probably always be the case. This means that all of the instructions to the engine (and from there to the CPU) are executed one-by-one and nothing ever happens in parallel. But wait! We just created a program which downloads data in parallel. True – the downloads happen in parallel but the instructions which control the download flow do not. We simply switch from one download to the other, but never in fact execute two things at the same time. This leads to a problem which we must always keep in mind when doing async single-threaded programming. Because instructions are not in fact executed in parallel, if we throw a heavy computation somewhere inside the loop, everything else will be stuck until the computation is complete.
Let’s look into another example in order to better illustrate the problem of blocking the loop.
We want to create a server. As any other server does, it will listen on a port and await connections from clients. Once a client is connected they will be able to make some sort of a request to the server and the server will serve it. Of course, we need to be able to serve multiple clients at the same time.
We can use the technique we’ve discussed so far to create such a server. It would open a port and poll it for connections. When a client connects, it will use non-blocking I/O to communicate with the client and will continue to switch between this communication, checking for new connections and serving already established connections. However, if, for-example, a client requests the server to calculate a fibonacci sequence of a great length, the server will be stuck in this and will not be able to do anything else for the other clients before it finishes. Connections will time out, new ones will not be received, etc. Essentially – the server will be gone. If we want our server to execute heavy computational tasks and still be responsive, we would need to use actual parallelism of execution, either by multi-threading or by spawning new processes to carry the heavy work for us.
So, why do we not do this by default, instead of dealing with this loop-switching thing? Starting and switching between threads and processes is a lot heavier and slower than “staying in one” process/thread and doing the work ourselves. It works perfectly for I/O heavy and CPU light programs (and most of what we do fall into this category). Indeed, however, if we do need those CPU cycles, multi-threading/processing is the way to go.
Final words
These were just the very basic oversimplified building blocks of what asynchronous programming is about. There is a lot more to be said, but this is an article, not a book, so we have to stop somewhere. If you are interested in the topic, I would suggest further research on promises, coroutines and fibers.
Enjoy the rabbit hole that asynchronous programming is!
IPC NEWSLETTER
All news about PHP and web development